---
title: flow_engine
sidebarTitle: flow_engine
---

# `prefect.flow_engine`

## Functions

### `load_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L136" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
load_flow_run(flow_run_id: UUID) -> FlowRun
```

### `load_flow` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L142" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
load_flow(flow_run: FlowRun) -> Flow[..., Any]
```

### `load_flow_and_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L158" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]
```

### `run_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1406" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
```

### `run_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1430" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Union[R, State, None]
```

### `run_generator_flow_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1454" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_generator_flow_sync(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> Generator[R, None, None]
```

### `run_generator_flow_async` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1495" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_generator_flow_async(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', context: Optional[dict[str, Any]] = None) -> AsyncGenerator[R, None]
```

### `run_flow` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1538" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_flow(flow: Flow[P, R], flow_run: Optional[FlowRun] = None, parameters: Optional[Dict[str, Any]] = None, wait_for: Optional[Iterable[PrefectFuture[R]]] = None, return_type: Literal['state', 'result'] = 'result', error_logger: Optional[logging.Logger] = None, context: Optional[dict[str, Any]] = None) -> R | State | None | Coroutine[Any, Any, R | State | None] | Generator[R, None, None] | AsyncGenerator[R, None]
```

### `run_flow_in_subprocess` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1611" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_flow_in_subprocess(flow: 'Flow[..., Any]', flow_run: 'FlowRun | None' = None, parameters: dict[str, Any] | None = None, wait_for: Iterable[PrefectFuture[Any]] | None = None, context: dict[str, Any] | None = None) -> multiprocessing.context.SpawnProcess
```


Run a flow in a subprocess.

Note the result of the flow will only be accessible if the flow is configured to
persist its result.

**Args:**
- `flow`: The flow to run.
- `flow_run`: The flow run object containing run metadata.
- `parameters`: The parameters to use when invoking the flow.
- `wait_for`: The futures to wait for before starting the flow.
- `context`: A serialized context to hydrate before running the flow. If not provided,
the current context will be used. A serialized context should be provided if
this function is called in a separate memory space from the parent run (e.g.
in a subprocess or on another machine).

**Returns:**
- A multiprocessing.context.SpawnProcess representing the process that is running the flow.


## Classes

### `FlowRunTimeoutError` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L132" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>


Raised when a flow run exceeds its defined timeout.


### `BaseFlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L165" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

**Methods:**

#### `cancel_all_tasks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L203" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
cancel_all_tasks(self) -> None
```

#### `is_pending` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L198" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
is_pending(self) -> bool
```

#### `is_running` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L193" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
is_running(self) -> bool
```

#### `state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L190" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
state(self) -> State
```

### `FlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L242" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

**Methods:**

#### `begin_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L292" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
begin_run(self) -> State
```

#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L804" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]
```

Convenience method to call the flow function. Returns a coroutine if the
flow is async.


#### `call_hooks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L538" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
call_hooks(self, state: Optional[State] = None) -> None
```

#### `client` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L248" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
client(self) -> SyncPrefectClient
```

#### `create_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L503" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
create_flow_run(self, client: SyncPrefectClient) -> FlowRun
```

#### `handle_crash` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L445" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_crash(self, exc: BaseException) -> None
```

#### `handle_exception` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L394" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
```

#### `handle_success` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L374" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_success(self, result: R) -> R
```

#### `handle_timeout` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L427" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_timeout(self, exc: TimeoutError) -> None
```

#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L690" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
initialize_run(self)
```

Enters a client context and creates a flow run if needed.


#### `load_subflow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L454" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
load_subflow_run(self, parent_task_run: TaskRun, client: SyncPrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
```

This method attempts to load an existing flow run for a subflow task
run, if appropriate.

If the parent task run is in a final but not COMPLETED state, and not
being rerun, then we attempt to load an existing flow run instead of
creating a new one. This will prevent the engine from running the
subflow again.

If no existing flow run is found, or if the subflow should be rerun,
then no flow run is returned.


#### `result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L348" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
```

#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L785" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_context(self)
```

#### `set_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L331" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
set_state(self, state: State, force: bool = False) -> State
```



#### `setup_run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L595" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
setup_run_context(self, client: Optional[SyncPrefectClient] = None)
```

#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L773" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
start(self) -> Generator[None, None, None]
```

### `AsyncFlowRunEngine` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L822" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>


Async version of the flow run engine.

NOTE: This has not been fully asyncified yet which may lead to async flows
not being fully asyncified.


**Methods:**

#### `begin_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L879" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
begin_run(self) -> State
```

#### `call_flow_fn` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1394" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
call_flow_fn(self) -> Coroutine[Any, Any, R]
```

Convenience method to call the flow function. Returns a coroutine if the
flow is async.


#### `call_hooks` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1122" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
call_hooks(self, state: Optional[State] = None) -> None
```

#### `client` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L835" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
client(self) -> PrefectClient
```

#### `create_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1089" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
create_flow_run(self, client: PrefectClient) -> FlowRun
```

#### `handle_crash` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1027" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_crash(self, exc: BaseException) -> None
```

#### `handle_exception` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L977" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_exception(self, exc: Exception, msg: Optional[str] = None, result_store: Optional[ResultStore] = None) -> State
```

#### `handle_success` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L960" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_success(self, result: R) -> R
```

#### `handle_timeout` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1008" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
handle_timeout(self, exc: TimeoutError) -> None
```

#### `initialize_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1272" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
initialize_run(self)
```

Enters a client context and creates a flow run if needed.


#### `load_subflow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1040" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
load_subflow_run(self, parent_task_run: TaskRun, client: PrefectClient, context: FlowRunContext) -> Union[FlowRun, None]
```

This method attempts to load an existing flow run for a subflow task
run, if appropriate.

If the parent task run is in a final but not COMPLETED state, and not
being rerun, then we attempt to load an existing flow run instead of
creating a new one. This will prevent the engine from running the
subflow again.

If no existing flow run is found, or if the subflow should be rerun,
then no flow run is returned.


#### `result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L935" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
result(self, raise_on_failure: bool = True) -> 'Union[R, State, None]'
```

#### `run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1375" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
run_context(self)
```

#### `set_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L918" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
set_state(self, state: State, force: bool = False) -> State
```



#### `setup_run_context` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1179" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
setup_run_context(self, client: Optional[PrefectClient] = None)
```

#### `start` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_engine.py#L1363" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
start(self) -> AsyncGenerator[None, None]
```
